import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple5;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.AscendingTimestampExtractor;
import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;

import static org.apache.hadoop.metrics2.impl.MsInfo.Context;

public class IntervalJoin {

    public static void main(String[] args) throws Exception {

        final StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();

        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        DataStream<Transcript> input1=env.fromElements(TRANSCRIPTS).assignTimestampsAndWatermarks(new AscendingTimestampExtractor<Transcript>() {

            @Override

            public long extractAscendingTimestamp(Transcript element) {

                return element.time;

            }

        });



        DataStream<Student> input2=env.fromElements(STUDENTS).assignTimestampsAndWatermarks(new AscendingTimestampExtractor<Student>() {

            @Override

            public long extractAscendingTimestamp(Student element) {

                return element.time;

            }

        });

        KeyedStream<Transcript,String>  keyedStream=input1.keyBy(new KeySelector<Transcript, String>() {

            @Override

            public String getKey(Transcript value) throws Exception {

                return value.id;

            }

        });

        KeyedStream<Student,String> otherKeyedStream=input2.keyBy(new KeySelector<Student, String>() {

            @Override

            public String getKey(Student value) throws Exception {

                return value.id;

            }

        });



        //e1.timestamp + lowerBound <= e2.timestamp <= e1.timestamp + upperBound



        // key1 == key2 && leftTs - 2 < rightTs < leftTs + 2


        SingleOutputStreamOperator<Tuple5<String, String, String, String, Integer>> result = keyedStream.intervalJoin(otherKeyedStream)

                .between(Time.milliseconds(-2), Time.milliseconds(2))

                .upperBoundExclusive()

                .lowerBoundExclusive()

                .process(new ProcessJoinFunction<Transcript, Student, Tuple5<String, String, String, String, Integer>>()
                {


        @Override

        public void processElement(Transcript transcript, Student student, Context ctx, Collector<Tuple5<String, String, String, String, Integer>> out) throws Exception
    {

            out.collect(Tuple5.of(transcript.id,transcript.name,student.class_,transcript.subject,transcript.score));

        }

    });
        result.print();

        env.execute();

}

    public static final Transcript[] TRANSCRIPTS = new Transcript[] {

            new Transcript("1","张三","语文",100,System.currentTimeMillis()),

            new Transcript("2","李四","语文",78,System.currentTimeMillis()),

            new Transcript("3","王五","语文",99,System.currentTimeMillis()),

            new Transcript("4","赵六","语文",81,System.currentTimeMillis()),

            new Transcript("5","钱七","语文",59,System.currentTimeMillis()),

            new Transcript("6","马二","语文",97,System.currentTimeMillis())

    };

    public static final Student[] STUDENTS = new Student[] {

            new Student("1","张三","class1",System.currentTimeMillis()),

            new Student("2","李四","class1",System.currentTimeMillis()),

            new Student("3","王五","class1",System.currentTimeMillis()),

            new Student("4","赵六","class2",System.currentTimeMillis()),

            new Student("5","钱七","class2",System.currentTimeMillis()),

            new Student("6","马二","class2",System.currentTimeMillis())

    };


}

//代码来自:
//https://www.cnblogs.com/dajiangtai/p/10711679.html